kafka connector 向mysql写数据时 如何指定key和value的schema? |
您所在的位置:网站首页 › kafka sink mysql › kafka connector 向mysql写数据时 如何指定key和value的schema? |
恰巧,本人最近在做一个这个方面的专题研究,我来说说自己对这你这个问题的看法。 首先,你是从kafka 的topic 把数据流向 mysql,要用的Kafka Connector应该是Sink类型的。目前,Confluent 3.3已经有Kafka的JDBC Connector,可以完成这个事情。 第二,如果你从Conflent的官网下载了Confluent (本回答发生时,为3.3 版本,分为社区版和企业版,下载哪个版本,要看你自己情况),里面自带了 jdbc的Connector,这个Connector包含了两个部分,一个是Source类型的(就是通过jdbc把数据导入到Kafka的),另外一个是Sink类型的(就是把Kafka的topic数据直接导入到jdbc连接的数据库的)。 第三,从你的问题知道,你应该已经接触了Kafka Connector了,关于怎么安装、启动服务、启动Connector我就不赘述了,你应该已经知道了。我要说的一点,就是,你通过设置 一个 xxx.properties文件,这个文件的内容大致如下: name=test-sink connector.class=io.confluent.connect.jdbc.JdbcSinkConnector tasks.max=1 topics=orders connection.url=jdbc:mysql://mysqlhost:port/dbname?name=xxx&password=xxxx auto.create=true具体字段意义很明了,然后通过启动Connector的命令启动,我从官网上摘下来这个命令你给参考下: ./bin/connect-standalone etc/schema-registry/connect-avro-standalone.properties etc/kafka-connect-jdbc/sink-mysql.properties(这个文件就是上面定义的xxx.properties)这样,你Kafka里面的 orders 的topic的内容,就会在你指定的mysql数据库中创建一个orders表,关于你说的Schema,并不需要指定。 另外,需要注意的一点,当你用上面的命令启动一个connector的时候, etc/schema-registry/connect-avro-standalone.properties这个参数配置文件中,会启动一个restful服务,这个服务默认端口是8083,如果你在一台主机同时启动多个connector的时候,需要修改配置文件中的端口号。 现在从另外一个角度来回答你标题中的问题:如果你用 avro-console-productor的时候,是要指定Schema的,但是用connector的时候,不需要这么做。 不知道我的回答是否能解决你的问题,也欢迎大家拍砖! |
CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3 |